{
 "metadata": {
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.7.6-final"
  },
  "orig_nbformat": 2,
  "kernelspec": {
   "name": "python3",
   "display_name": "Python 3",
   "language": "python"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2,
 "cells": [
  {
   "source": [
    "# 对测试集test_format1生成特征，用于预测"
   ],
   "cell_type": "markdown",
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark import SparkContext\n",
    "from pyspark.sql import SparkSession"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark = SparkSession\\\n",
    "        .builder\\\n",
    "        .master(\"local\")\\\n",
    "        .appName(\"DataProcess\")\\\n",
    "        .config(\"spark.executor.memory\",\"3g\")\\\n",
    "        .config(\"spark.executor.instances\",\"5\")\\\n",
    "        .getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "#导入user_log数据\n",
    "user_log = spark.read.csv(r\"hdfs://node1:9000/user/root/exp4/data_format1/user_log_format1.csv\", encoding='utf8', header=True, inferSchema=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "#导入测试集\n",
    "df_train = spark.read.csv(r\"hdfs://node1:9000/user/root/exp4/data_format1/test_format1.csv\", encoding='utf8', header=True, inferSchema=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [],
   "source": [
    "#导入用户信息\n",
    "user_info = spark.read.csv(r\"hdfs://node1:9000/user/root/exp4/data_format1/user_info_format1.csv\", encoding='utf8', header=True, inferSchema=True)"
   ]
  },
  {
   "source": [
    "想要建立的特征\n",
    "需要根据user_id，和merchant_id（seller_id）,从用户画像表以及用户日志表中提取特征，填写到df_train这个数据框中，从而训练评估模型 需要建立的特征如下：\n",
    "\n",
    "用户的年龄(age_range)  \n",
    "用户的性别(gender)  \n",
    "某用户在该商家日志的总条数(total_logs)  \n",
    "用户浏览的商品的数目，就是浏览了多少个商品(unique_item_ids)  \n",
    "浏览的商品的种类的数目，就是浏览了多少种商品(categories)  \n",
    "用户浏览的天数(browse_days)  \n",
    "用户单击的次数(one_clicks)  \n",
    "用户添加购物车的次数(shopping_carts)  \n",
    "用户购买的次数(purchase_times)  \n",
    "用户收藏的次数(favourite_times)  "
   ],
   "cell_type": "markdown",
   "metadata": {}
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "#age_range,gender特征添加\n",
    "df_train = df_train.join(user_info,[\"user_id\"],\"left\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "#total_logs(某用户在该商家日志的总条数)特征添加\n",
    "total_logs_temp = user_log.groupby([\"user_id\",\"seller_id\"]).count()\n",
    "#total_logs_temp.orderBy(\"user_id\").limit(20).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "total_logs_temp = total_logs_temp.withColumnRenamed(\"seller_id\",\"merchant_id\").withColumnRenamed(\"count\",\"total_logs\")\n",
    "#total_logs_temp.limit(1).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_train = df_train.join(total_logs_temp,[\"user_id\",\"merchant_id\"],\"left\")\n",
    "#df_train.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "#unique_item_ids特征添加\n",
    "unique_item_ids_temp = user_log.groupby([\"user_id\",\"seller_id\",\"item_id\"]).count()\n",
    "unique_item_ids_temp = unique_item_ids_temp.selectExpr(\"user_id\",\"seller_id\",\"item_id\")\n",
    "#unique_item_ids_temp.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "unique_item_ids_temp = unique_item_ids_temp.groupBy([\"user_id\",\"seller_id\"]).count()\n",
    "unique_item_ids_temp = unique_item_ids_temp.withColumnRenamed(\"seller_id\",\"merchant_id\").withColumnRenamed(\"count\",\"unique_item_ids\")\n",
    "#unique_item_ids_temp.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_train = df_train.join(unique_item_ids_temp,[\"user_id\",\"merchant_id\"],\"left\")\n",
    "#df_train.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "#categories特征构建\n",
    "categories_temp = user_log.groupby([\"user_id\", \"seller_id\", \"cat_id\"]).count()\n",
    "#categories_temp.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 14,
   "metadata": {},
   "outputs": [],
   "source": [
    "categories_temp = categories_temp.selectExpr(\"user_id\",\"seller_id\",\"cat_id\")\n",
    "#categories_temp.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "categories_temp = categories_temp.groupBy([\"user_id\",\"seller_id\"]).count()\n",
    "categories_temp = categories_temp.withColumnRenamed(\"seller_id\",\"merchant_id\").withColumnRenamed(\"count\",\"categories\")\n",
    "#categories_temp.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_train = df_train.join(categories_temp,[\"user_id\",\"merchant_id\"],\"left\")\n",
    "#df_train.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [],
   "source": [
    "#browse_days特征构建\n",
    "browse_days_temp = user_log.groupby([\"user_id\",\"seller_id\",\"time_stamp\"]).count()\n",
    "#browse_days_temp.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [],
   "source": [
    "browse_days_temp = browse_days_temp.selectExpr(\"user_id\",\"seller_id\",\"time_stamp\")\n",
    "browse_days_temp = browse_days_temp.groupby([\"user_id\",\"seller_id\"]).count()\n",
    "browse_days_temp = browse_days_temp.withColumnRenamed(\"seller_id\",\"merchant_id\").withColumnRenamed(\"count\",\"browse_days\")\n",
    "#browse_days_temp.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 19,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_train = df_train.join(browse_days_temp,[\"user_id\",\"merchant_id\"],\"left\")\n",
    "#df_train.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [],
   "source": [
    "#先将处理好的暂时写到文件中\n",
    "df_train.write.options(header=\"true\").csv(\"hdfs://node1:9000/user/root/exp4/procd_test_temp.csv\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [],
   "source": [
    "#为了避免jvm崩掉，只能另起一个session\n",
    "spark.stop()\n",
    "spark = SparkSession\\\n",
    "        .builder\\\n",
    "        .master(\"local\")\\\n",
    "        .appName(\"DataProcess2\")\\\n",
    "        .config(\"spark.executor.memory\",\"3g\")\\\n",
    "        .config(\"spark.executor.instances\",\"5\")\\\n",
    "        .getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "#导入user_log数据\n",
    "user_log = spark.read.csv(r\"hdfs://node1:9000/user/root/exp4/data_format1/user_log_format1.csv\", encoding='utf8', header=True, inferSchema=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "#one_clicks、shopping_carts、purchase_times、favourite_times特征构建\n",
    "one_clicks_temp = user_log.groupby([\"user_id\",\"seller_id\",\"action_type\"]).count()\n",
    "one_clicks_temp = one_clicks_temp.withColumnRenamed(\"seller_id\",\"merchant_id\").withColumnRenamed(\"count\",\"times\")\n",
    "#one_clicks_temp.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {
    "tags": []
   },
   "outputs": [],
   "source": [
    "from pyspark.sql import functions\n",
    "from pyspark.sql.types import *\n",
    "def click_time(action,times):\n",
    "    if action == 0:\n",
    "        return 0\n",
    "    else:\n",
    "        return times\n",
    "udf_click_time = functions.udf(click_time,IntegerType())\n",
    "one_clicks_temp = one_clicks_temp.withColumn(\"one_clicks\",udf_click_time(\"action_type\",\"times\"))\n",
    "#one_clicks_temp.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [],
   "source": [
    "def shopping_click(action,times):\n",
    "    if action == 1:\n",
    "        return times\n",
    "    else:\n",
    "        return 0\n",
    "udf_click_time = functions.udf(shopping_click,IntegerType())\n",
    "one_clicks_temp = one_clicks_temp.withColumn(\"shopping_carts\",udf_click_time(\"action_type\",\"times\"))\n",
    "#one_clicks_temp.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [],
   "source": [
    "def purchase_click(action,times):\n",
    "    if action == 2:\n",
    "        return times\n",
    "    else:\n",
    "        return 0\n",
    "udf_click_time = functions.udf(purchase_click,IntegerType())\n",
    "one_clicks_temp = one_clicks_temp.withColumn(\"purchase_times\",udf_click_time(\"action_type\",\"times\"))\n",
    "#one_clicks_temp.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [],
   "source": [
    "def favor_click(action,times):\n",
    "    if action == 3:\n",
    "        return times\n",
    "    else:\n",
    "        return 0\n",
    "udf_click_time = functions.udf(favor_click,IntegerType())\n",
    "one_clicks_temp = one_clicks_temp.withColumn(\"favor_times\",udf_click_time(\"action_type\",\"times\"))\n",
    "#one_clicks_temp.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 9,
   "metadata": {},
   "outputs": [],
   "source": [
    "four_features = one_clicks_temp.groupby([\"user_id\",\"merchant_id\"]).sum()\n",
    "#four_features.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 10,
   "metadata": {},
   "outputs": [],
   "source": [
    "four_features = four_features.selectExpr(\"user_id\",\"merchant_id\",\"`sum(one_clicks)` as one_clicks\",\n",
    "\"`sum(shopping_carts)` as shopping_carts\",\"`sum(purchase_times)` as purchase_times\",\"`sum(favor_times)` as favor_times\")\n",
    "#four_features.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [],
   "source": [
    "#导入test_data数据\n",
    "df_train = spark.read.csv(r\"hdfs://node1:9000/user/root/exp4/procd_test_temp.csv\", encoding='utf8', header=True, inferSchema=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_train = df_train.join(four_features,[\"user_id\",\"merchant_id\"],\"left\")\n",
    "#df_train.limit(10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "metadata": {},
   "outputs": [],
   "source": [
    "#df_train.write.options(header=\"true\").csv(\"hdfs://node1:9000/user/root/exp4/procd_train_real.csv\")\n",
    "#df_train.write.parquet(\"hdfs://node1:9000/user/root/exp4/procd_train_real.parquet\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [],
   "source": [
    "#填充缺失值\n",
    "#第一种策略是将后8个特征所有null值填充为0\n",
    "df_train_filled = df_train.fillna(0)\n",
    "#df_train_filled.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [],
   "source": [
    "df_train_filled.write.options(header=\"true\").csv(\"hdfs://node1:9000/user/root/exp4/procd_test_real.csv\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {},
   "outputs": [],
   "source": [
    "#将数据转为合适的格式\n",
    "from pyspark.mllib.regression import LabeledPoint\n",
    "from pyspark.mllib.linalg import Vectors\n",
    "#先转成RDD\n",
    "df_train_rdd = df_train_filled.rdd\n",
    "#改成(label,features)的格式\n",
    "df_train_rdd = df_train_rdd.map(lambda line: LabeledPoint(0,Vectors.dense(line[3:])))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 38,
   "metadata": {},
   "outputs": [],
   "source": [
    "#保存为LibSVMFile格式，方便后面训练使用\n",
    "from pyspark.mllib.util import MLUtils\n",
    "MLUtils.saveAsLibSVMFile(df_train_rdd, \"hdfs://node1:9000/user/root/exp4/procd_test_real\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 39,
   "metadata": {},
   "outputs": [],
   "source": [
    "#别忘了关掉session\n",
    "spark.stop()"
   ]
  }
 ]
}